In [ ]:
from IPython.display import Image, SVG

Data Creation

Accounts

  • Create several gzipped files
  • Each line in each file is a JSON encoded dictionary with the following keys
id: Unique identifier of the customer
name: Name of the customer
transactions: List of transaction-id, amount pairs, one for each transaction for the customer in that file

In [ ]:
from accounts import create_accounts_json

num_files = 25
n = 100000  # number of accounts per file
k = 500  # number of transactions

create_accounts_json(num_files, n, k)

Denormalize NFS Data

  • The NFS data is normalized to eliminate redundancy

In [ ]:
from nfs import create_denormalized

create_denormalize()

Random Array

  • Create a billion number array of 32-bit floats on disk using HDF5
  • HDF5 is an implementation of the Hierarchical Data Format common in scientific applications
    • Multiple data formats (tables, nd-arrays, raster images)
    • Fast lookups via B-tree indices (like SQL)
    • Filesystem-like data format
    • Support for meta-information
  • The result of this operation is 4 GB

In [ ]:
from random_array import random_array
random_array()

Dask

Introduction

Dask is a flexible parallel computing library for analytics. Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Flexible: Supports complex and messy workloads
  • Scales up: Runs resiliently on clusters with 100s of nodes
  • Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

The Dask Computational Model

  • Parallel programming with task scheduling
  • Familiar abstractions for executing tasks in parallel on data that doesn't fit into memory
    • Arrays, DataFrames
  • Task graphs
    • Representation of a parallel computation
  • Scheduling
    • Executes task graphs in parallel on a single machine using threads or processes
    • Preliminary support for parallel execution using dask.distributed
      • Workflows for the distributed scheduler would be quite different than those presented below

Note

  • If you don't have a big data problem, don't use a big data tool
  • Many of the below examples could easily be handled in-memory with some better choices

In [ ]:
Image("http://dask.pydata.org/en/latest/_images/collections-schedulers.png")

Dask Array

  • Subset of ndarray interface using blocked algorithms
  • Dask array complements large on-disk array stores like HDF5, NetCDF, and BColz

In [ ]:
SVG("http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg")
  • Arithmetic and scalar mathematics, +, *, exp, log, ...
  • Reductions along axes, sum(), mean(), std(), sum(axis=0), ...
  • Tensor contractions / dot products / matrix multiply, tensordot
  • Axis reordering / transpose, transpose
  • Slicing, x[:100, 500:100:-2]
  • Fancy indexing along single axes with lists or numpy arrays, x[:, [10, 1, 5]]
  • The array protocol __array__
  • Some linear algebra svd, qr, solve, solve_triangular, lstsq

Full API Documentation


In [ ]:
import dask.array as da
  • The idea of the chunk is important and has performance implications

In [ ]:
x = da.arange(25, chunks=5)

In [ ]:
y = x ** 2
  • Dask operates on a delayed computation model
  • It builds up an expression of the computation in chunks
  • Creates a Task Graph that you can explore

In [ ]:
y

In [ ]:
y.visualize()

In [ ]:
y.dask.keys()
  • You can execute the graph by using compute

In [ ]:
y.compute()
  • As an example of the __array__ protocol

In [ ]:
np.array(y)

Scheduling Backends

  • You can control the scheduler backend that is used by compute
  • These choices can be important in a few situations
    • Debugging
    • Fast tasks
    • Cross-task communication
  • dask.get is an alias for the synchronous backend. Useful for debugging.

Synchronous Queue Scheduler


In [ ]:
y.compute(get=dask.get)

Threaded Scheduler

  • dask.threaded.get is the default
  • Uses a thread pool backend
  • A thread is the smallest unit of work that an OS can schedule
    • Threads are "lightweight"
  • They execute within the same process and thus shares the same memory and file resources (everything is a file in unix)
  • Limitations
    • Limited by the Global Interpreter Lock (GIL)
      • A GIL means that only one thread can execute at the same time
    • Pure python functions likely won't show a speed-up (with a few exceptions)
      • C code can release the GIL
      • I/O tasks are not blocked by the GIL

In [ ]:
y.compute(get=dask.threaded.get)
  • By default, dask will use as many threads as there are logical processors on your machine

In [ ]:
from multiprocessing import cpu_count
cpu_count()

Process Scheduler

  • Backend that uses multiprocessing
  • Uses a process pool backend
    • On unix-like system this is a system call to fork
    • Calling fork creates a new child process which is a copy(-on-write) of the parent process
    • Owns its own resources. This is "heavy"
  • Limitations
    • Relies on serializing objects for the workers (slow and error prone)
    • Workers must communicate through parent process

In [ ]:
y.compute(get=dask.multiprocessing.get)

Distributed Executor

  • This is part of the dask.distributed library
  • Distributes work over the network across machines using web sockets and an asychronous web framework for Python (tornado)
    • Some recent additions make this work for, e.g., distributed DataFrames

Blocked Algorithms

  • Dask works on arrays by executing blocked algorithms on chunks of data
  • For example, consider taking the mean of a billion numbers. We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums and divide this by the total number of observations.
  • the result (one sum on one billion numbers) is performed by many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)

In [ ]:
import h5py
import os

f = h5py.File(os.path.join('..', 'data', 'random.hdf5'))
dset = f['/x']
  • If were to implement this ourselves it might look like this
  1. Computing the sum of each 1,000,000 sized chunk of the array
  2. Computing the sum of the 1,000 intermediate sums

In [ ]:
sums = []
for i in range(0, 1000000000, 1000000):
    chunk = dset[i: i + 1000000]
    sums.append(chunk.sum())

total = np.sum(sums)
print(total / 1e9)
  • Dask does this for you and uses the backend scheduler to do so in parallel
  • Create a dask array from an array-like structure (any object that implements numpy-like slicing)

In [ ]:
x = da.from_array(dset, chunks=(1000000, ))
  • x looks and behaves much like a numpy array
    • Arithmetic, slicing, reductions
  • Use tab-completion to look at the methods of x

In [ ]:
result = x.mean()

In [ ]:
result

In [ ]:
result.compute()

In [ ]:
x[:10].compute()

Exercise

Use dask.array.random.normal to create a 20,000 x 20,000 array $X ~ \sim N(10, .1)$ with chunks set to (1000, 1000)

Take the mean of every 100 elements along axis 0.

Hint: Recall you can slice with the following syntax [start:end:step]


In [ ]:
# [Solution here]

In [ ]:
%load solutions/dask_array.py

Performance vs. NumPy

Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.


In [ ]:
import numpy as np

In [ ]:
%%time 
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x.mean(axis=0)[::100] 
y

Faster and needs only MB of memory


In [ ]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute()

Linear Algebra

  • Dask implements a few linear algebra functions that are paraellizable
  • da.linalg.qr
  • da.linalg.cholesky
  • da.linalg.svd

Dask Bag

  • Parallel lists for semi-structured data
    • Nested, variable length, heterogenously typed, etc.
    • E.g., JSON blobs or text data
  • Anything that can be represented as a large collection of generic Python objects
  • Mainly for cleaning and processing
    • I.e., usually the first step in a workflow
  • Bag implements a number of useful methods for operation on sequences like map, filter, fold, frequencies and groupby
  • Streaming computation on top of generators
  • Bags use the multiprocessing backend by default

Example

  • Using the accounts data we created above

In [ ]:
import os
import dask.bag as db

In [ ]:
bag = db.read_text(os.path.join('..', 'data', 'accounts.*.json.gz'))

In [ ]:
bag.take(3)
  • Using map to process the lines in the text files

In [ ]:
import json

In [ ]:
js = bag.map(json.loads)

In [ ]:
js.take(3)

In [ ]:
counts = js.pluck('name').frequencies()

In [ ]:
counts.compute()

Exercise

  • Use filter and take all of the transactions for the first five users named "Alice"
  • Define a function count_transactions that takes a dictionary from accounts and returns a dictionary that holds the name and a key count that is the number of transactions for that user id.
  • Use filter to get the accounts where the user is named Alice and map the function you just created to get the number of transactions for each user named Alice. pluck the count and display the first 5.

In [ ]:
%load solutions/bag_alice.py

GroupBy / FoldBy

  • Groupby collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.
  • This requires a full on-disk shuffle and is very inefficient
  • You almost never want to do this in a real workflow if you can avoid it

In [ ]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()
  • Group by evens and odds

In [ ]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

Group by eevens and odds and take the largest value


In [ ]:
b.groupby(lambda x: x % 2).map(lambda k, v: (k, max(v))).compute()
  • FoldBby, while harder to grok, is much more efficient
  • This does a streaming combined groupby and reduction
  • Familiar to Spark users as the combineByKey method on RDD

When using foldby you provide

  1. A key function on which to group elements
  2. A binary operator such as you would pass to reduce that you use to perform reduction per each group
  3. A combine binary operator that can combine the results of two reduce calls on different parts of your dataset.

Your reduction must be associative. It will happen in parallel in each of the partitions of your dataset. Then all of these intermediate results will be combined by the combine binary operator.

This is just what we saw in sum above

  • functools.reduce works like so

In [ ]:
import functools

In [ ]:
values = range(10)

In [ ]:
def func(acc, y):
    print(acc)
    print(y)
    print()
    return acc + y

In [ ]:
functools.reduce(func, values)

In [ ]:
b.foldby(lambda x: x % 2, binop=max, combine=max).compute()

Using the accounts data above, find the number of people with the same name


In [ ]:
js.take(1)

In [ ]:
from dask.diagnostics import ProgressBar

In [ ]:
counts = js.foldby(key='name',
                   binop=lambda total, x: total + 1,
                   initial=0,
                   combine=lambda a, b: a + b,            
                   combine_initial=0)

In [ ]:
with ProgressBar():
    result = counts.compute()

In [ ]:
result

Exercise

  • Compute the total amounts for each name
  • First, create a function that computes the total for each user id
  • Change the above example to accumulate the total amount instead of count

In [ ]:
%load solutions/bag_foldby.py

Dask DataFrame

  • subset of the pandas API
  • Good for analyzing heterogenously typed tabular data arranged along an index

Trivially parallelizable operations (fast):

  • Elementwise operations: df.x + df.y, df * df
  • Row-wise selections: df[df.x > 0]
  • Loc: df.loc[4.0:10.5]
  • Common aggregations: df.x.max(), df.max()
  • Is in: df[df.x.isin([1, 2, 3])]
  • Datetime/string accessors: df.timestamp.month

Cleverly parallelizable operations (fast):

  • groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
  • value_counts: df.x.value_counts()
  • Drop duplicates: df.x.drop_duplicates()
  • Join on index: dd.merge(df1, df2, left_index=True, right_index=True)
  • Join with Pandas DataFrames: dd.merge(df1, df2, on='id')
  • Elementwise operations with different partitions / divisions: df1.x + df2.y
  • Datetime resampling: df.resample(...)
  • Rolling averages: df.rolling(...)
  • Pearson Correlations: df[['col1', 'col2']].corr()

Operations requiring a shuffle (slow-ish, unless on index)

  • Set index: df.set_index(df.x)
  • groupby-apply (with anything): df.groupby(df.x).apply(myfunc)
  • Join not on the index: dd.merge(df1, df2, on='name')

Reading data


In [ ]:
import dask.dataframe as dd

In [ ]:
df = dd.read_csv("../data/NationalFoodSurvey/NFS*.csv")
  • DataFrame.head is one operation that is not lazy

In [ ]:
df.head(5)

Partitions

  • By default the data is partitioned by the file
  • In our case, this is good. The files have a natural partition
  • When this is not the case, you must do a disk-based shuffle which is slow

In [ ]:
df.npartitions

In [ ]:
df.known_divisions
  • We are going to set the partition explicitly to styr to make some operations more performant
  • Partitions are denoted by the left-side of the bins for the partitions.
  • The final value is assumed to be the inclusive right-side for the last bin.

So

[1974, 1975, 1976]

Would be 2 partitions. The first contains 1974. The second contains 1975 and 1976. To get three partitions, one for the final observation, duplicate it.

[1974, 1975, 1976, 1976]

In [ ]:
partitions = list(range(1974, 2001)) + [2000]

df = df.set_partition('styr', divisions=partitions)

In [ ]:
df.known_divisions

In [ ]:
df.divisions
  • Nothing yet is loaded in to memory
  • Meta-information from pandas is available

In [ ]:
df.info()

DataFrame API

  • In addition to the (supported) pandas DataFrame API, dask provides a few more convenient methods

    • DataFrame.categorize
    • DataFrame.map_partions
    • DataFrame.get_division
    • DataFrame.repartition
    • DataFrame.set_partition
    • DataFrame.to_{bag|castra}
    • DataFrame.visualize
  • A few methods have a slightly different API

    • DataFrame.apply
    • GroupBy.apply

get_division


In [ ]:
df2000 = df.get_division(26)

In [ ]:
type(df2000)

What food group was consumed the most times in 2000?


In [ ]:
df2000.set_index('minfd')
  • NOTE: We could speed up subsequent operations by setting partitions

In [ ]:
grp = df2000.groupby('minfd')

In [ ]:
size = grp.apply(len, columns='size')

In [ ]:
size.head()
  • There isn't (yet) support for idxmin/idxmax.
  • Turn it into a Series first

In [ ]:
minfd = size.compute().idxmax()

In [ ]:
print(minfd)
  • Get the pre-processed mapping across food grouping variables

In [ ]:
food_mapping = pd.read_csv("../data/NationalFoodSurvey/food_mapping.csv")
  • Pandas provides the efficient isin method

In [ ]:
food_mapping.ix[food_mapping.minfd.isin([minfd])]

Exercise

  • What was the most consumed food group in 1974?

In [ ]:
# [Solution here]

In [ ]:
%load solutions/nfs_most_purchased.py

map_partitions

  • Map partitions does what you might expect
  • Maps a function across partitions
  • Let's calculate the most frequently purchase food group for each year

In [ ]:
def most_frequent_food(partition):
    # partition is a pandas.DataFrame
    grpr = partition.groupby('minfd')
    size = grpr.size()
    minfd = size.idxmax()
    idx = food_mapping.minfd.isin([minfd])
    description = food_mapping.ix[idx].minfddesc.iloc[0]
    year = int(partition.styr.iloc[0])
    return year, description

In [ ]:
mnfd_year = df.map_partitions(most_frequent_food)

In [ ]:
mnfd_year.compute()

In [ ]:
zip(mnfd_year.compute(),)

Exercise

  • Within each year, group by household minfd and calculate daily per capita consumption of each food group. Hint, you want to use map_partitions.

In [ ]:
%load solutions/average_consumption.py

Aside on Storage Formats: Thinking about Computers

blosc

  • Meta-compression format for (binary) data
  • Cache-aware
  • Can be faster query data on disk with blosc (bcolz) than pandas in memory

In [ ]:
Image('images/bcolz_bench.png')

Dask Resources